#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License
#

# Run the C examples and verify that they behave as expected.
# Example executables must be in PATH

from test_unittest import unittest

from test_subprocess import Popen, Server, TestProcessError, check_output

MESSAGES=10

def receive_expect_messages(n=MESSAGES): return ''.join(['{"sequence"=%s}\n'%i for i in range(1, n+1)])
def receive_expect_total(n=MESSAGES): return "%s messages received\n"%n
def receive_expect(n=MESSAGES): return receive_expect_messages(n)+receive_expect_total(n)
def send_expect(n=MESSAGES): return "%s messages sent and acknowledged\n" % n
def send_abort_expect(n=MESSAGES): return "%s messages started and aborted\n" % n

class Broker(Server):
    def __init__(self):
        super(Broker, self).__init__(["broker", "", "0"], kill_me=True)

class ExampleTest(unittest.TestCase):

    def runex(self, name, port, messages=MESSAGES):
        """Run an example with standard arguments, return output"""
        return check_output([name, "", port, "xtest", str(messages)])

    def startex(self, name, port, messages=MESSAGES):
        """Start an example sub-process with standard arguments"""
        return Popen([name, "", port, "xtest", str(messages)])

    def test_send_receive(self):
        """Send first then receive"""
        with Broker() as b:
            self.assertEqual(send_expect(), self.runex("send", b.port))
            self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port))

    def test_receive_send(self):
        """Start receiving  first, then send."""
        with Broker() as b:
            r = self.startex("receive", b.port)
            self.assertEqual(send_expect(), self.runex("send", b.port))
            self.assertMultiLineEqual(receive_expect(), r.communicate()[0])

    def test_send_direct(self):
        """Send to direct server"""
        d = Server(["direct", "", "0"])
        self.assertEqual(send_expect(), self.runex("send", d.port))
        self.assertMultiLineEqual(receive_expect(), d.communicate()[0])

    def test_receive_direct(self):
        """Receive from direct server"""
        d =  Server(["direct", "", "0"])
        self.assertMultiLineEqual(receive_expect(), self.runex("receive", d.port))
        self.assertEqual("10 messages sent and acknowledged\n", d.communicate()[0])

    def test_send_abort_broker(self):
        """Sending aborted messages to a broker"""
        with Broker() as b:
            self.assertEqual(send_expect(), self.runex("send", b.port))
            self.assertEqual(send_abort_expect(), self.runex("send-abort", b.port))
            for i in range(MESSAGES):
                self.assertEqual("Message aborted\n", b.stdout.readline())
            self.assertEqual(send_expect(), self.runex("send", b.port))
            expect = receive_expect_messages(MESSAGES)+receive_expect_messages(MESSAGES)+receive_expect_total(20)
            self.assertMultiLineEqual(expect, self.runex("receive", b.port, "20"))

    def test_send_abort_direct(self):
        """Send aborted messages to the direct server"""
        d = Server(["direct", "", "0", "examples", "20"])
        self.assertEqual(send_expect(), self.runex("send", d.port))
        self.assertEqual(send_abort_expect(), self.runex("send-abort", d.port))
        self.assertEqual(send_expect(), self.runex("send", d.port))
        expect = receive_expect_messages() + "Message aborted\n"*MESSAGES + receive_expect_messages()+receive_expect_total(20)
        self.maxDiff = None
        self.assertMultiLineEqual(expect, d.communicate()[0])

    def test_send_ssl_receive(self):
        """Send with SSL, then receive"""
        try:
            with Broker() as b:
                got = self.runex("send-ssl", b.port)
                self.assertIn("secure connection:", got)
                self.assertIn(send_expect(), got)
                self.assertMultiLineEqual(receive_expect(), self.runex("receive", b.port))
        except TestProcessError as e:
            if e.output and e.output.startswith(b"error initializing SSL"):
                print("Skipping %s: SSL not available" % self.id())
            else:
                raise

if __name__ == "__main__":
    unittest.main()
